SpringBoot整合Quartz实现动态任务,及在Job中注入Bean

您所在的位置:网站首页 springboot bean注入如何加开关 SpringBoot整合Quartz实现动态任务,及在Job中注入Bean

SpringBoot整合Quartz实现动态任务,及在Job中注入Bean

2024-07-10 18:17| 来源: 网络整理| 查看: 265

需求背景

项目中有定时任务的请求,SpringBoot环境下最简单的实现是加个注解@Scheduled(cron = ""),但是这只能实现固定的定时任务。需要可以通过客户端启动、停止、修改定时间隔等功能,就需要上Quartz了。另外,一般的介绍都通过Job接口实现具体的业务功能,但是如何在Job中注入现有的bean也是一个问题。

Quzrtz简介

1.Quartz是一个开源的任务调度框架。基于定时、定期的策略来执行任务是它的核心功能,比如x年x月的每个星期五上午8点到9点,每隔10分钟执行1次。

2.Quartz有3个核心要素:调度器(Scheduler)、任务(Job)、触发器(Trigger)。

2.1.Job(任务):是一个接口,有一个方法void execute(),可以通过实现该接口来定义需要执行的任务(具体的逻辑代码)。 2.2.JobDetail:Quartz每次执行job时,都重新创建一个Job实例,会接收一个Job实现类,以便运行的时候通过newInstance()的反射调用机制去实例化Job.JobDetail是用来描述Job实现类以及相关静态信息,比如任务在scheduler中的组名等信息。 2.3.Trigger(触发器):描述触发Job执行的时间触发规则实现类SimpleTrigger和CronTrigger可以通过crom表达式定义出各种复杂的调度方案。 2.4.Calendar:是一些日历特定时间的集合。一个Trigger可以和多个 calendar关联,比如每周一早上10:00执行任务,法定假日不执行,则可以通过calendar进行定点排除。 2.5.Scheduler(调度器):代表一个Quartz的独立运行容器。Trigger和JobDetail可以注册到Scheduler中。Scheduler可以将Trigger绑定到某一JobDetail上,这样当Trigger被触发时,对应的Job就会执行。一个Job可以对应多个Trigger,但一个Trigger只能对应一个Job.

实现

pom文件中加入dependency,不需要加version,跟特定的spring版本绑定了。

org.quartz-scheduler quartz

 QuartzScheduler类,动态定时任务的核心,增加、删除、修改定时任务。

import java.util.Date; import org.quartz.CronScheduleBuilder; import org.quartz.CronTrigger; import org.quartz.JobBuilder; import org.quartz.JobDetail; import org.quartz.JobKey; import org.quartz.Scheduler; import org.quartz.SchedulerException; import org.quartz.TriggerBuilder; import org.quartz.TriggerKey; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Configuration; /** * 任务调度处理 * */ @Configuration public class QuartzScheduler { // 任务调度 @Autowired private Scheduler scheduler; /** * 开始执行所有任务 * * @throws SchedulerException */ public void startJob(String cron, String jobName) throws SchedulerException { startJob1(scheduler, cron, jobName); //startJob2(scheduler); scheduler.start(); } /** * 获取Job信息 * * @param name * @param group * @return * @throws SchedulerException */ public String getJobInfo(String name, String group) throws SchedulerException { TriggerKey triggerKey = new TriggerKey(name, group); CronTrigger cronTrigger = (CronTrigger) scheduler.getTrigger(triggerKey); return String.format("time:%s,state:%s", cronTrigger.getCronExpression(), scheduler.getTriggerState(triggerKey).name()); } /** * 修改某个任务的执行时间 * * @param name * @param group * @param time * @return * @throws SchedulerException */ public boolean modifyJob(String name, String group, String time) throws SchedulerException { Date date = null; TriggerKey triggerKey = new TriggerKey(name, group); CronTrigger cronTrigger = (CronTrigger) scheduler.getTrigger(triggerKey); String oldTime = cronTrigger.getCronExpression(); if (!oldTime.equalsIgnoreCase(time)) { CronScheduleBuilder cronScheduleBuilder = CronScheduleBuilder.cronSchedule(time); CronTrigger trigger = TriggerBuilder.newTrigger().withIdentity(name, group) .withSchedule(cronScheduleBuilder).build(); date = scheduler.rescheduleJob(triggerKey, trigger); } return date != null; } /** * 暂停所有任务 * * @throws SchedulerException */ public void pauseAllJob() throws SchedulerException { scheduler.pauseAll(); } /** * 暂停某个任务 * * @param name * @param group * @throws SchedulerException */ public void pauseJob(String name, String group) throws SchedulerException { JobKey jobKey = new JobKey(name, group); JobDetail jobDetail = scheduler.getJobDetail(jobKey); if (jobDetail == null) return; scheduler.pauseJob(jobKey); } /** * 恢复所有任务 * * @throws SchedulerException */ public void resumeAllJob() throws SchedulerException { scheduler.resumeAll(); } /** * 恢复某个任务 * * @param name * @param group * @throws SchedulerException */ public void resumeJob(String name, String group) throws SchedulerException { JobKey jobKey = new JobKey(name, group); JobDetail jobDetail = scheduler.getJobDetail(jobKey); if (jobDetail == null) return; scheduler.resumeJob(jobKey); } /** * 删除某个任务 * * @param name * @param group * @throws SchedulerException */ public void deleteJob(String name, String group) throws SchedulerException { JobKey jobKey = new JobKey(name, group); JobDetail jobDetail = scheduler.getJobDetail(jobKey); if (jobDetail == null) return; scheduler.deleteJob(jobKey); } private void startJob1(Scheduler scheduler, String cron, String jobName) throws SchedulerException { // 通过JobBuilder构建JobDetail实例,JobDetail规定只能是实现Job接口的实例 // JobDetail 是具体Job实例 JobDetail jobDetail = JobBuilder.newJob(RealDataJob.class).withIdentity(jobName, jobName).build(); // 基于表达式构建触发器 CronScheduleBuilder cronScheduleBuilder = CronScheduleBuilder.cronSchedule(cron); // CronTrigger表达式触发器 继承于Trigger // TriggerBuilder 用于构建触发器实例 CronTrigger cronTrigger = TriggerBuilder.newTrigger().withIdentity(jobName, jobName) .withSchedule(cronScheduleBuilder).build(); scheduler.scheduleJob(jobDetail, cronTrigger); } }

如需程序启动时就启动定时任务,加入ApplicationStartQuartzJobListener类,如不需要可不加这个类。

import org.quartz.Scheduler; import org.quartz.SchedulerException; import org.quartz.SchedulerFactory; import org.quartz.impl.StdSchedulerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.ApplicationListener; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.event.ContextRefreshedEvent; @Configuration public class ApplicationStartQuartzJobListener implements ApplicationListener{ @Autowired private QuartzScheduler quartzScheduler; /** * 初始启动quartz */ @Override public void onApplicationEvent(ContextRefreshedEvent event) { try { quartzScheduler.startJob("0/10 * * * * ?", "RealData"); System.out.println("任务已经启动..."); } catch (SchedulerException e) { e.printStackTrace(); } } /** * 初始注入scheduler * @return * @throws SchedulerException */ @Bean public Scheduler scheduler() throws SchedulerException{ SchedulerFactory schedulerFactoryBean = new StdSchedulerFactory(); return schedulerFactoryBean.getScheduler(); } }

具体的执行任务的类,实现Job接口。  

import com.sfauto.cloud.gateway.mqtt.MqttConfig; import com.sfauto.cloud.gateway.mqtt.MqttPushClient; import com.sfauto.cloud.gateway.service.IMqttResponse; import lombok.extern.slf4j.Slf4j; import org.quartz.Job; import org.quartz.JobExecutionContext; import org.quartz.JobExecutionException; import org.springframework.beans.factory.annotation.Autowired; @Slf4j public class RealDataJob implements Job{ private void before(){ log.info("任务开始执行"); } @Override public void execute(JobExecutionContext arg0) throws JobExecutionException { //before(); System.out.println("开始:"+System.currentTimeMillis()); IMqttResponse mqttResponse = SpringUtil.getBean(IMqttResponse.class); MqttPushClient mqttPushClient = SpringUtil.getBean(MqttPushClient.class); MqttConfig properties = SpringUtil.getBean(MqttConfig.class); // TODO 业务 String str = mqttResponse.subRealData(); String topicResp = properties.getPlattopic()+"/"+properties.getTopic().substring(0, properties.getTopic().length()-2)+"/"; mqttPushClient.pushlish(0, false, topicResp+"realdata", str); System.out.println("结束:"+System.currentTimeMillis()); //after(); } private void after(){ log.info("任务开始执行"); } }

这个类需要重点说明一下,如果在这个类里面去@Autowire某些JavaBean,是注入不进去的。Job 对象实例化的过程是通过 Quartz 内部自己完成的。可以看上面的QuartzScheduler类的实现,通过JobBuilder.newJob(Job.class)进行Job类的初始化。

但是我们通过 Spring 进行注入的 Bean 却是由 Spring 容器管理的,Quartz 内部无法感知到 Spring 容器管理的 Bean,所以没有办法在创建 Job 的时候就给装配进去。

查了一下很多实现方案都是说,需要自己定义一个JobFactory交给spring管理,同时还得将自定义的 JobFactory 设置到 Schedule中,相对来说比较复杂。

那既然我们依赖的bean是已经由spring管理的,那么我们直接取出来用不就好?也就是通过实现一个工具类去获取Spring容器中的Bean。

import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.BeansException; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import org.springframework.context.ApplicationEvent; import org.springframework.stereotype.Component; @Component public class SpringUtil implements ApplicationContextAware { private static final Logger LOGGER = LoggerFactory.getLogger(SpringUtil.class); private static ApplicationContext context; @Override public void setApplicationContext(ApplicationContext context) throws BeansException { SpringUtil.context = context; } /** * 获取 Spring Bean * @param clazz 类 * @param 泛型 * @return 对象 */ public static T getBean(Class clazz) { if (clazz == null) { return null; } return context.getBean(clazz); } /** * 获取 Spring Bean * @param bean 名称 * @param 泛型 * @return 对象 */ @SuppressWarnings("unchecked") public static T getBean(String bean) { if (bean == null) { return null; } return (T) context.getBean(bean); } /** * 获取 Spring Bean * @param beanName 名称 * @param clazz 类 * @param 泛型 * @return 对象 */ public static T getBean(String beanName, Class clazz) { if (null == beanName || "".equals(beanName.trim())) { return null; } if (clazz == null) { return null; } return (T) context.getBean(beanName, clazz); } /** * 获取上下文 * @return 上下文 */ public static ApplicationContext getContext() { if (context == null) { throw new RuntimeException("There has no Spring ApplicationContext!"); } return context; } /** * 发布事件 * @param event 事件 */ public static void publishEvent(ApplicationEvent event) { if (context == null) { return; } try { context.publishEvent(event); } catch (Exception ex) { LOGGER.error(ex.getMessage()); } } }

在上面的Job类中,注入了MQTT通信的bean,定时发送MQTT消息。

程序中其他地方根据接收到的MQTT消息类型,选择启动或删除定时任务

@Autowired QuartzScheduler quartzScheduler; //...... if(param.getOpType().equals("sub")) { try { quartzScheduler.startJob("0/10 * * * * ?", "RealData"); } catch (Exception e) { log.error(e.getMessage()); } } else if(param.getOpType().equals("unsub")) { try { quartzScheduler.deleteJob("RealData", "RealData"); } catch (Exception e) { log.error(e.getMessage()); } }



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3